package fun.easycode.datastream;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.baomidou.dynamic.datasource.DynamicRoutingDataSource;
import com.baomidou.dynamic.datasource.creator.DefaultDataSourceCreator;
import com.baomidou.dynamic.datasource.spring.boot.autoconfigure.DataSourceProperty;
import fun.easycode.datastream.repository.TaskRepository;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.rocketmq.client.exception.MQClientException;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.aop.Advisor;
import org.springframework.aop.aspectj.AspectJExpressionPointcut;
import org.springframework.aop.support.DefaultPointcutAdvisor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.net.InetSocketAddress;

/**
 * data stream 自动配置
 * @author xuzhen97
 */
@EnableConfigurationProperties(DataStreamProperties.class)
@Configuration
@Slf4j
@MapperScan(basePackages = "fun.easycode.datastream.repository")
@MapperScanner(basePackages = "${data-stream.dao-mapper-scan}")
public class DataStreamAutoConfiguration implements WebMvcConfigurer {

    @Resource
    private DataSource dataSource;
    @Resource
    private DefaultDataSourceCreator dataSourceCreator;
    @Resource
    private DataStreamProperties dataStreamProperties;

    @PostConstruct
    public void init(){
        log.info("start data stream auto configuration init.");
        // 初始化数据源
        DataSourceProperty dataSourceProperty = new DataSourceProperty();

        dataSourceProperty.setUrl(dataStreamProperties.getDataSource().getUrl());
        dataSourceProperty.setUsername(dataStreamProperties.getDataSource().getUsername());
        dataSourceProperty.setPassword(dataStreamProperties.getDataSource().getPassword());
        dataSourceProperty.setDriverClassName(dataStreamProperties.getDataSource().getDriverClassName());
        DynamicRoutingDataSource ds = (DynamicRoutingDataSource) dataSource;
        // 创建数据源
        DataSource creatorDataSource = dataSourceCreator.createDataSource(dataSourceProperty);
        // 动态添加数据源
        ds.addDataSource("data-stream", creatorDataSource);
        log.info("end data stream auto configuration init.");

    }

    /**
     * 默认的canal数据解析器
     *  RowData to DataEntry
     * @return CanalRowDataParser
     */
    @Bean
    public CanalRowDataParser<?> rowDataConverter(){
        return new CanalRowDataParser.DefaultCanalRowDataParser<>();
    }

    /**
     * canal 连接器
     *  canal 框架中的东西，需要配置
     *  集群模式
     * @return CanalConnector
     */
    @Bean
    @ConditionalOnProperty(prefix = "data-stream.canal", name = "cluster", havingValue = "true")
    public CanalConnector canalClusterConnector(DataStreamProperties quickProperties){

        log.info("canal cluster connector bean.");
        return CanalConnectors.newClusterConnector(quickProperties.getCanal().getZookeeperAddress()
                , quickProperties.getCanal().getDestination()
                , quickProperties.getCanal().getUsername(), quickProperties.getCanal().getPassword());
    }

    /**
     * canal 连接器
     *  canal 框架中的东西，需要配置
     *  单机模式
     * @return CanalConnector
     */
    @Bean
    @ConditionalOnProperty(prefix = "data-stream.canal", name = "cluster", havingValue = "false")
    public CanalConnector canalSingleConnector(DataStreamProperties quickProperties){
        log.info("canal single connector bean.");

        return CanalConnectors.newSingleConnector(new InetSocketAddress(quickProperties.getCanal().getHost(),
                quickProperties.getCanal().getPort()), quickProperties.getCanal().getDestination(), quickProperties.getCanal().getUsername(), quickProperties.getCanal().getPassword());
    }

    /**
     * canal 适配器
     *  用于将canal和quick框架连接起来
     * @return CanalAdapter
     */
    @Bean
    public CanalAdapter canalAdapter(CanalConnector canalConnector, CanalRowDataParser<?> rowDataConverter
            , DataStreamProperties quickProperties){
        return new CanalAdapter(canalConnector, rowDataConverter, quickProperties);
    }

    /**
     * quick 上下文
     * @return DataContext
     */
    @Bean
    public DataContext quickContext(DataQuantityManager quickQuantityManager
            , CanalAdapter canalAdapter, CuratorFramework curatorFramework
            , DataCompleteStartConsumer dataCompleteStartConsumer, DataCompleteStartProducer dataCompleteStartProducer){
        return new DataContext(quickQuantityManager, canalAdapter, curatorFramework
                , dataCompleteStartConsumer, dataCompleteStartProducer);
    }

    /**
     * quick执行器，就是封装的一个线程池
     * @return DataCompleteExecutor
     */
    @Bean
    public DataCompleteExecutor dataCompleteExecutor(){
        return new DataCompleteExecutor();
    }

    /**
     * 全量开始任务消息消费者
     * @param dataStreamProperties 配置
     * @return DataCompleteStartConsumer
     * @throws MQClientException mq异常
     */
    @Bean
    public DataCompleteStartConsumer dataCompleteStartConsumer(DataStreamProperties dataStreamProperties) throws MQClientException {
        return new DataCompleteStartConsumer(dataCompleteExecutor(), taskRepository(), dataStreamProperties);
    }

    /**
     * 全量任务开始消息生产者
     * @param dataStreamProperties 配置
     * @return DataCompleteStartProducer
     */
    @Bean
    public DataCompleteStartProducer dataCompleteStartProducer(DataStreamProperties dataStreamProperties)  {
        return new DataCompleteStartProducer(dataStreamProperties);
    }

    /**
     * data-stream全量任务管理器
     * @param taskRepository 任务仓库
     * @return DataCompleteManager
     */
    @Bean
    public DataCompleteManager quickCompleteManager(TaskRepository taskRepository
            , DataCompleteStartProducer dataCompleteStartProducer){
        return new DataCompleteManager(taskRepository, dataCompleteStartProducer);
    }

    /**
     * data-stream增量任务管理器
     * @param quickProperties quick配置
     * @return DataQuantityManager
     */
    @Bean
    public DataQuantityManager quickQuantityManager(DataStreamProperties quickProperties
            , CanalAdapter canalAdapter){
        return new DataQuantityManager(canalAdapter, quickProperties);
    }

    @Bean
    public DynamicDatasourceInterceptor dsAdvice() {
        return new DynamicDatasourceInterceptor();
    }

    @Bean
    public Advisor dsAdviceAdvisor(DynamicDatasourceInterceptor dsAdvice, DataStreamProperties properties) {
        AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut();
        // 切面表达式
        // 这里的规则是dao下的包名就是数据源的名称，数据源名称和包一致就可以自动切换数据源
        pointcut.setExpression("execution (* " + properties.getDaoPackage() + "..*.*(..))");
        return new DefaultPointcutAdvisor(pointcut, dsAdvice);
    }

    @Bean
    public CuratorFramework curatorFramework(DataStreamProperties properties){
        //重试策略，初试时间 3 秒，重试 3 次
        RetryPolicy policy = new ExponentialBackoffRetry(3000, 3);
        //通过工厂创建 Curator
        return CuratorFrameworkFactory.builder()
                .connectString(properties.getCanal().getZookeeperAddress())
                .connectionTimeoutMs(2000)
                .sessionTimeoutMs(2000)
                .retryPolicy(policy).build();
    }

    @Bean
    public TaskRepository taskRepository(){
        return new TaskRepository();
    }

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(new DSInterceptor()).addPathPatterns("/data-stream/**");
    }
}
